1 /*
2 Copyright: Marcelo S. N. Mancini (Hipreme|MrcSnm), 2018 - 2021
3 License:   [https://creativecommons.org/licenses/by/4.0/|CC BY-4.0 License].
4 Authors: Marcelo S. N. Mancini
5 
6 	Copyright Marcelo S. N. Mancini 2018 - 2021.
7 Distributed under the CC BY-4.0 License.
8    (See accompanying file LICENSE.txt or copy at
9 	https://creativecommons.org/licenses/by/4.0/
10 */
11 module hip.util.concurrency;
12 
13 
14 version(CustomRuntimeTest){}
15 else
16 {
17     version(Windows) version = HipConcurrency;
18     version(Android) version = HipConcurrency;
19     version(UWP) version = HipConcurrency;
20     version(linux) version = HipConcurrency;
21 }
22 
23 
24 
25 version(HipConcurrency)
26 {
27 
28     /**
29     *   Test for wrapping atomic operations in a structure
30     */
31     struct Atomic(T)
32     {
33         import core.atomic;
34         private T value;
35 
36         auto opAssign(T)(T value)
37         {       
38             atomicStore(this.value, value);
39             return value;
40         }
41         private @property T v(){return atomicLoad(value);}
42         alias v this;
43 
44     }
45 
46     ///Tries to implement a `volatile` java style
47     struct Volatile(T)
48     {
49         import core..volatile;
50         private T value;
51 
52         auto synchronized opAssign(T)(T value)
53         {    
54             volatileStore(&this.value, value);
55             return value;
56         }
57         private @property synchronized T v(){return volatileLoad(value);}
58         alias v this;   
59     }
60 
61     import core.thread;
62     import core.sync.mutex : Mutex;
63     import core.sync.semaphore:Semaphore;
64 
65     class DebugMutex
66     {
67         private string lastFileLock;
68         private size_t lastLineLock;
69         private ThreadID lastID;
70 
71         private string lastFileUnlock;
72         private size_t lastLineUnlock;
73 
74         private Mutex mtx;
75 
76         private ThreadID mainThreadId;
77 
78         this(ThreadID mainId = ThreadID.init)
79         {
80             this.mainThreadId = mainId;
81             mtx = new Mutex();
82         }
83         void lock(string file = __FILE__, size_t line = __LINE__)
84         {
85             import std.process:thisThreadID;
86             if(lastLineLock == 0)
87             {
88                 lastLineUnlock = 0;
89                 lastFileUnlock = null;
90 
91                 lastFileLock = file;
92                 lastLineLock = line;
93                 lastID = thisThreadID;
94             }
95             else
96             {
97                 version(Desktop)
98                 {
99                     import std.stdio;
100                     import hip.util.conv:to;
101                     string last = (lastID == mainThreadId ? "Main " : "") ~ "Thread("~to!string(lastID)~")";
102                     string curr = (thisThreadID == mainThreadId ? "Main " : "") ~ "Thread("~to!string(thisThreadID)~")";
103 
104                     writeln("Tried to lock a locked mutex at ", file, ":", line,
105                     "\n\tLast locked at ", lastFileLock, ":",lastLineLock, " ", last, 
106                     " Current Thread is ",curr
107                     );
108                 }
109             }
110             mtx.lock();
111         }
112         void unlock(string file = __FILE__, size_t line = __LINE__)
113         {
114             version(Desktop)
115             {
116                 import std.process:thisThreadID;
117                 if(lastLineLock == 0)
118                 {
119                     import std.stdio;
120                     import hip.util.conv:to;
121                     string last = (lastID == mainThreadId ? "Main " : "") ~ "Thread("~to!string(lastID)~")";
122                     string curr = (thisThreadID == mainThreadId ? "Main " : "") ~ "Thread("~to!string(thisThreadID)~")";
123                     
124                     writeln(
125                         "Tried to unlock an unlocked mutex at ", file, ":", line, 
126                         "\n\tLast unlocked at ",  lastFileUnlock, ":",lastLineUnlock, " ", last,
127                         " Current Thread is ",curr
128                     );
129                     // throw new Error("Tried to unlock an unlocked mutex");
130                 }
131             }
132             lastLineUnlock = line;
133             lastFileUnlock = file;
134             lastFileLock = null;
135             lastLineLock = 0;
136             mtx.unlock();
137         }
138 
139     }
140 
141     class HipWorkerThread : Thread
142     {
143         private struct WorkerJob
144         {
145             string name;
146             void delegate() task;
147             void delegate(string taskName) onTaskFinish;
148         }
149         private WorkerJob[] jobsQueue;
150         private Semaphore semaphore;
151         private bool isAlive;
152         private DebugMutex mutex;
153         private HipWorkerPool pool;
154         private ThreadID mainThreadID;
155 
156 
157         this(HipWorkerPool pool = null, ThreadID mainThreadID = ThreadID.init)
158         {
159             super(&run);
160             if(pool)
161                 this.pool = pool;
162             isAlive = true;
163             semaphore = new Semaphore;
164             this.mainThreadID = mainThreadID;
165             mutex = new DebugMutex(mainThreadID);
166         }
167         /**
168         *   This thread goes into an invalid state after finishing it. It should not be used anymore
169         */
170         void finish()
171         {
172             mutex.lock();
173             isAlive = false;
174             semaphore.notify;
175             mutex.unlock();
176         }
177         bool isIdle()
178         {
179             mutex.lock();
180             bool ret = isIdleImpl();
181             mutex.unlock();
182             return ret;
183         }
184         private bool isIdleImpl()
185         {
186             return jobsQueue.length == 0;
187         }
188         /**
189         *   Synchronized push on queue
190         */
191         void pushTask(string name, void delegate() task, void delegate(string taskName) onTaskFinish = null)
192         {
193             if(isAlive)
194             {
195                 mutex.lock();
196                 jobsQueue~= WorkerJob(name, task, onTaskFinish);
197                 mutex.unlock();
198                 semaphore.notify();
199             }
200             else
201             {
202                 import std.stdio;
203                 writeln("Thread is not alive to get tasks.");
204             }
205         }
206 
207         void startWorking()
208         {
209             if(!isRunning)
210                 start();
211         }
212         void await(bool rethrow = true)
213         {
214             // pushTask("await", () => finish);
215             // join(rethrow);
216         }
217 
218         void run()
219         {
220             while(isAlive)
221             {
222                 mutex.lock();
223                 if(!isIdleImpl)
224                 {
225                     WorkerJob job = jobsQueue[0];
226                     mutex.unlock();
227                     import std.stdio;
228                     try
229                     {
230                         mutex.lock();
231                         job.task();
232                         if(job.onTaskFinish != null)
233                         {
234                             job.onTaskFinish(job.name);
235                         }
236                         mutex.unlock();
237                     }
238                     catch(Error e)
239                     {
240                         onAnyException(true, e.toString());
241                         return;
242                     }
243                     catch(Exception e)
244                     {
245                         onAnyException(false, e.toString());
246                         return;
247                     }
248                     mutex.lock();
249                     jobsQueue = jobsQueue[1..$];
250                     mutex.unlock();
251                 }
252                 else
253                     mutex.unlock();
254                 semaphore.wait;
255             }
256         }
257 
258         private void onAnyException(bool isError, string message)
259         {
260             import std.stdio;
261             isAlive = false;
262             if(pool)
263                 pool.onHipThreadError(this, isError,message);
264         }
265         void dispose()
266         {
267             finish();
268             destroy(semaphore);
269             destroy(mutex);
270         }
271     }
272 
273 
274     class HipWorkerPool
275     {
276         HipWorkerThread[] threads;
277         protected Semaphore awaitSemaphore;
278         protected void delegate()[] finishHandlersOnMainThread;
279         protected void delegate()[] onAllTasksFinishHandlers;
280         protected DebugMutex handlersMutex;
281 
282         private struct Task
283         {
284             string name;
285             void delegate() task;
286             void delegate(string taskName) onTaskFinish = null;
287         }
288         private Task[] mainThreadTasks;
289         private uint awaitCount = 0;
290         private size_t tasksCount;
291 
292 
293         this(size_t poolSize)
294         {
295             threads = new HipWorkerThread[](poolSize);
296             import std.process:thisThreadID;
297             auto mainId = thisThreadID;
298             handlersMutex = new DebugMutex(mainId);
299             for(size_t i = 0; i < poolSize; i++)
300                 threads[i] = new HipWorkerThread(this, mainId);
301             awaitSemaphore = new Semaphore(0);
302         }
303 
304         void addOnAllTasksFinished(void delegate() onAllFinished)
305         {
306             if(tasksCount == 0)
307                 onAllFinished();
308             else
309                 onAllTasksFinishHandlers~= onAllFinished;
310         }
311 
312         protected void onHipThreadError(HipWorkerThread worker, bool isError, string message)
313         {
314             if(awaitCount > 0)
315             {
316                 awaitSemaphore.notify();
317             }
318             import hip.util.array;
319             import std.stdio;
320             writeln("Worker ", worker.jobsQueue[0].name, " failed with ", isError ? "error" : "exception", ":", message);
321             threads.remove(worker);
322         }
323         void await()
324         {
325             awaitCount = 0;
326             foreach(thread; threads)
327             {
328                 if(!thread.isIdle)
329                 {
330                     thread.pushTask("Await", ()
331                     {
332                         awaitSemaphore.notify;
333                     });
334                     awaitCount++;
335                 }
336             }
337             startWorking();
338             while(awaitCount > 0)
339             {
340                 awaitSemaphore.wait();
341                 awaitCount--;
342             }
343         }
344         /**
345         *   If there is no idle thread, null will be returned and the task and onFinish callbacks will be executed on that same thread.
346         *   - Keep in mind that pushin task is not enough. You need to call startWorking() to make it active after pushing tasks
347         */
348         HipWorkerThread pushTask(string name, void delegate() task, void delegate(string taskName) onTaskFinish = null, bool isOnFinishOnMainThread = false)
349         {
350             handlersMutex.lock();
351             tasksCount++;
352             handlersMutex.unlock();
353             foreach(thread; threads)
354             {
355                 if(thread.isIdle)
356                 {
357                     if(onTaskFinish !is null && isOnFinishOnMainThread)
358                         thread.pushTask(name, task, notifyOnFinishOnMainThread(onTaskFinish));
359                     else
360                         thread.pushTask(name, task, notifyOnFinish(onTaskFinish));
361                     return thread;
362                 }
363             }
364             //Execute a main thread task if it had anything.
365             handlersMutex.lock();
366             mainThreadTasks~= Task(name, task, notifyOnFinish(onTaskFinish));
367             handlersMutex.unlock();
368             return null;
369         }
370 
371         protected void executeMainThreadTasks()
372         {
373             handlersMutex.lock();
374             if(mainThreadTasks.length != 0)
375             {
376                 foreach(mainThreadTask; mainThreadTasks)
377                 {
378                     mainThreadTask.task();
379                     if(mainThreadTask.onTaskFinish != null)
380                         mainThreadTask.onTaskFinish(mainThreadTask.name);
381                 }
382                 mainThreadTasks.length = 0;
383             }
384             handlersMutex.unlock();
385         }
386 
387         /**
388         *   This function should be called every time you push a task.
389         */
390         void startWorking()
391         {
392             foreach(thread; threads)
393                 if(!thread.isIdle)
394                     thread.startWorking();
395             executeMainThreadTasks();
396         }
397 
398         void delegate(string name) notifyOnFinish(void delegate(string taskName) onFinish = null)
399         {
400             return (name)
401             {
402                 handlersMutex.lock();
403                     if(onFinish)
404                         onFinish(name);
405                     tasksCount--;
406                 handlersMutex.unlock();
407             };
408         }
409 
410         void delegate(string name) notifyOnFinishOnMainThread(void delegate(string taskName) onFinish, bool finished = true)
411         {
412             return (name)
413             {
414                 handlersMutex.lock();
415                     finishHandlersOnMainThread~= ()
416                     {
417                         onFinish(name);
418                         if(finished)
419                             tasksCount--;
420                     };
421                 handlersMutex.unlock();
422             };
423         }
424 
425         bool isIdle()
426         {
427             foreach(thread; threads)
428                 if(!thread.isIdle)
429                     return false;
430             return true;
431         }
432 
433         void pollFinished()
434         {
435             handlersMutex.lock();
436                 if(finishHandlersOnMainThread.length)
437                 {
438                     foreach(finishHandler; finishHandlersOnMainThread)
439                         finishHandler();
440                     finishHandlersOnMainThread.length = 0;
441                 }
442                 if(tasksCount == 0 && onAllTasksFinishHandlers.length)
443                 {
444                     foreach(onAllFinish; onAllTasksFinishHandlers)
445                         onAllFinish();
446                     onAllTasksFinishHandlers.length = 0;
447                 }
448             handlersMutex.unlock();
449 
450         }
451 
452         void dispose()
453         {
454             foreach(thread; threads)
455                 thread.dispose();
456             destroy(threads);
457             destroy(awaitSemaphore);
458             destroy(handlersMutex);
459         }
460     }
461 
462 }
463 else
464 {
465     class DebugMutex
466     {
467         this(ulong id = 0){}
468         final void lock(){}
469         final void unlock(){}
470     }
471     class HipWorkerPool
472     {
473         private HipWorkerThread thread;
474         protected void delegate()[] onAllTasksFinishHandlers;
475         private void delegate()[] finishHandlersOnMainThread;
476         size_t tasksCount = 0;
477         void addOnAllTasksFinished(void delegate() onAllFinished)
478         {
479             if(tasksCount == 0)
480                 onAllFinished();
481             else
482                 onAllTasksFinishHandlers~= onAllFinished;
483         }
484 
485         this(size_t poolSize)
486         {
487             thread = new HipWorkerThread(this, ulong.max);
488         }
489         void delegate(string name) notifyOnFinishOnMainThread(void delegate(string taskName) onFinish, bool finished = true)
490         {
491             return (name)
492             {
493                 finishHandlersOnMainThread~= ()
494                 {
495                     onFinish(name); 
496                     if(finished)
497                         tasksCount--;
498                 };
499             };
500         }
501 
502         void delegate(string name) notifyOnFinish(void delegate(string taskName) onFinish)
503         {
504             return (name)
505             {
506                 if(onFinish) onFinish(name);
507                 version(WebAssembly){}
508                 else
509                     tasksCount--;
510             };
511         }
512         final void signalTaskFinish()
513         {
514             assert(tasksCount > 0, "Tried to signal task finish without tasks.");
515             tasksCount--;
516         }
517         final void await()
518         {
519             version(WebAssembly) assert(false, "Code using await does not work on WebAssembly.");
520         }
521         final void pollFinished()
522         {
523             if(finishHandlersOnMainThread.length)
524             {
525                 foreach(handler; finishHandlersOnMainThread)
526                     handler();
527                 finishHandlersOnMainThread.length = 0;
528             }
529             if(tasksCount == 0 && onAllTasksFinishHandlers.length)
530             {
531                 foreach(onAllFinish; onAllTasksFinishHandlers)
532                     onAllFinish();
533                 onAllTasksFinishHandlers.length = 0;
534             }
535         }
536         final HipWorkerThread pushTask(string name, void delegate() task, void delegate(string taskName) onTaskFinish = null, bool isOnFinishOnMainThread = true)
537         {
538             tasksCount++;
539             version(WebAssembly)
540                 assert(onTaskFinish is null, "Can't have an onTaskFinish on Wasm, implement it on a higher level using notfyOnFinish.");
541             thread.pushTask(name, task, notifyOnFinish(onTaskFinish));
542             return thread;
543         }
544         final void startWorking(){thread.startWorking();}
545         final void finish(){}
546         final bool isIdle(){return thread.isIdle;}
547         final void dispose(){}
548     }
549     class HipWorkerThread
550     {
551         struct WorkerTask
552         {
553             void delegate() task;
554             void delegate(string taskName) onTaskFinish;
555             string name;
556         }
557         WorkerTask[] tasks;
558 
559         this(HipWorkerPool pool, ulong id){}
560         final void pushTask(string name, void delegate() task, void delegate(string taskName) onTaskFinish = null)
561         {
562             tasks~= WorkerTask(task, onTaskFinish, name);
563         }
564 
565         final void startWorking()
566         {
567             foreach(task; tasks)           
568             {
569                 task.task();
570                 if(task.onTaskFinish)
571                     task.onTaskFinish(task.name);
572             }
573             tasks.length = 0;
574         }
575 
576         bool isIdle(){return tasks.length == 0;}
577     }
578 }